查看原文
其他

一款支持自动流水线和客户端缓存的 Go 语言 Redis 客户端

源自开发者 源自开发者
2024-08-28

Rueidis 是一款高性能的 Go 语言 Redis 客户端,它支持自动流水线操作和服务端辅助客户端缓存等功能。Rueidis 的目标是提供一个简单易用、性能卓越的 Redis 客户端库,以满足 Go 开发者的各种需求。

主要功能

  • 自动流水线操作,提升非阻塞命令的执行效率
  • 服务端辅助客户端缓存,大幅降低延迟和提高吞吐量
  • 支持泛型对象映射和客户端缓存
  • 提供缓存策略模式(Cache-Aside pattern)的实现
  • 支持分布式锁和客户端缓存
  • 提供用于编写测试的 Rueidis 模拟库
  • 集成 OpenTelemetry,方便监控和追踪
  • 支持 Pub/Sub、分片 Pub/Sub 和 Streams
  • 兼容 Redis Cluster、Sentinel、RedisJSON、RedisBloom、RediSearch、RedisTimeseries 等
  • 提供不依赖 Redis Stack 的概率数据结构

快速入门

package main

import (
 "context"
 "github.com/redis/rueidis"
)

func main() {
 client, err := rueidis.NewClient(rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
 if err != nil {
  panic(err)
 }
 defer client.Close()

 ctx := context.Background()
 // SET key val NX
 err = client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error()
 // HGETALL hm
 hm, err := client.Do(ctx, client.B().Hgetall().Key("hm").Build()).AsStrMap()
}

命令构建器

Rueidis 提供了简单易用的命令构建器,方便开发者构建 Redis 命令。

client.B().Set().Key("key").Value("val").Nx().Build()

流水线操作

自动流水线操作

Rueidis 会自动将并发执行的非阻塞 Redis 命令进行流水线操作,以减少网络往返次数和系统调用次数,从而提高吞吐量。

func BenchmarkPipelining(b *testing.B, client rueidis.Client) {
 b.RunParallel(func(pb *testing.PB) {
  for pb.Next() {
   client.Do(context.Background(), client.B().Get().Key("k").Build()).ToString()
  }
 })
}

手动流水线操作

除了自动流水线操作外,Rueidis 还支持手动流水线操作。

cmds := make(rueidis.Commands, 010)
for i := 0; i < 10; i++ {
    cmds = append(cmds, client.B().Set().Key("key").Value("value").Build())
}
for _, resp := range client.DoMulti(ctx, cmds...) {
    if err := resp.Error(); err != nil {
        panic(err)
    }
}

服务端辅助客户端缓存

Rueidis 默认开启了服务端辅助客户端缓存的 opt-in 模式,开发者可以通过 DoCache()DoMultiCache() 方法使用该功能。

client.DoCache(ctx, client.B().Hmget().Key("mk").Field("1""2").Cache(), time.Minute).ToArray()
client.DoMultiCache(ctx,
    rueidis.CT(client.B().Get().Key("k1").Cache(), 1*time.Minute),
    rueidis.CT(client.B().Get().Key("k2").Cache(), 2*time.Minute))

上下文取消

client.Do()client.DoMulti()client.DoCache()client.DoMultiCache() 方法都支持上下文取消,如果上下文被取消或超时,这些方法会提前返回。

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error() == context.DeadlineExceeded

发布/订阅

Rueidis 提供了 client.Receive() 方法来接收来自 Redis 频道的信息。

err = client.Receive(context.Background(), client.B().Subscribe().Channel("ch1""ch2").Build(), func(msg rueidis.PubSubMessage) {
    // 处理接收到的信息
})

CAS 事务

Rueidis 支持 CAS 事务,即 WATCH + MULTI + EXEC 操作。

client.Dedicated(func(c rueidis.DedicatedClient) error {
    c.Do(ctx, c.B().Watch().Key("k1""k2").Build())
    c.Do(ctx, c.B().Mget().Key("k1""k2").Build())
    c.DoMulti(
        ctx,
        c.B().Multi().Build(),
        c.B().Set().Key("k1").Value("1").Build(),
        c.B().Set().Key("k2").Value("2").Build(),
        c.B().Exec().Build(),
    )
    return nil
})

Lua 脚本

Rueidis 提供了 NewLuaScriptNewLuaScriptReadOnly 方法来创建 Lua 脚本。

script := rueidis.NewLuaScript("return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}")
list, err := script.Exec(ctx, client, []string{"k1""k2"}, []string{"a1""a2"}).ToArray()

流式读取

Rueidis 提供了 client.DoStream()client.DoMultiStream() 方法来流式读取 Redis 的响应数据。

s := client.DoMultiStream(ctx, client.B().Get().Key("a{slot1}").Build(), client.B().Get().Key("b{slot1}").Build())
for s.HasNext() {
    n, err := s.WriteTo(io.Discard)
    if rueidis.IsRedisNil(err) {
        // ...
    }
}

内存消耗

Rueidis 的每个底层连接都会分配一个环形缓冲区用于流水线操作,缓冲区的大小由 ClientOption.RingScaleEachConn 选项控制,默认值为 10,即每个环形缓冲区的大小为 2^10 字节。

如果 Rueidis 连接过多,可能会占用大量内存。在这种情况下,可以考虑将 ClientOption.RingScaleEachConn 选项的值减小到 8 或 9,但这样做可能会降低吞吐量。

创建 Redis 客户端

可以使用 NewClient 函数创建 Rueidis 客户端,并指定各种选项。

// 连接到单个 Redis 节点
client, err := rueidis.NewClient(rueidis.ClientOption{
    InitAddress: []string{"127.0.0.1:6379"},
})

// 连接到 Redis 集群
client, err := rueidis.NewClient(rueidis.ClientOption{
    InitAddress: []string{"127.0.0.1:7001""127.0.0.1:7002""127.0.0.1:7003"},
    ShuffleInit: true,
})

Redis URL

可以使用 ParseURLMustParseURL 函数解析 Redis URL,并将其转换为 ClientOption 结构体。

// 连接到 Redis 集群
client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:7001?addr=127.0.0.1:7002&addr=127.0.0.1:7003"))

任意命令

如果需要构建 Rueidis 命令构建器中没有提供的 Redis 命令,可以使用 client.B().Arbitrary() 方法。

// 这将生成 [ANY CMD k1 k2 a1 a2] 命令
client.B().Arbitrary("ANY""CMD").Keys("k1""k2").Args("a1""a2").Build()

处理 JSON、原始字节数组和向量相似度搜索

Rueidis 命令构建器将所有参数都视为 Redis 字符串,这意味着开发者可以将 []byte 直接存储到 Redis 中,而无需进行转换。

client.B().Set().Key("b").Value(rueidis.BinaryString([]byte{...})).Build()

命令响应解析

Rueidis 提供了一系列方法来解析 Redis 命令的响应数据,例如 ToString()AsInt64()ToArray() 等。

// GET 命令
client.Do(ctx, client.B().Get().Key("k").Build()).ToString()
client.Do(ctx, client.B().Get().Key("k").Build()).AsInt64()

使用 DecodeSliceOfJSON 解析 JSON 数组

DecodeSliceOfJSON 函数可以将 Redis 响应中的 JSON 数组解析为 Go 结构体切片。

type User struct {
 Name string `json:"name"`
}

// ...

var users []*User
if err := rueidis.DecodeSliceOfJSON(client.Do(ctx, client.B().Mget().Key("user1""user2").Build()), &users); err != nil {
 return err
}
文章精选

使用 Go 语言连接并操作 SQLite 数据库

Go语言官方团队推荐的依赖注入工具

替代zap,Go语言官方实现的结构化日志包

Go语言常见错误 | 不使用function option模式

必看| Go语言项目结构最佳实践


点击关注并扫码添加进交流群
领取「Go 语言」学习资料

继续滑动看下一个
源自开发者
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存